airflow源码精读 十二

您所在的位置:网站首页 airflow worker airflow源码精读 十二

airflow源码精读 十二

#airflow源码精读 十二| 来源: 网络整理| 查看: 265

Hive 和Airflow

airflow链接hive ,重点在于airflow的运行机器环境和hive的连通是没有问题的,这是实际开发中非常重要的点和前提。

1.维护hive_conn连接信息,参考conn连接配置

2.理解hiveoperator

3.理解hive,整体的思路就是连接上hive,拼接hql,通过beeline在hive内运行sql 并返回运行结果,

只是把在hive上执行的hql改成了airflow的python code

class HiveOperator(BaseOperator): @apply_defaults def __init__( self, hql, hive_cli_conn_id='hive_cli_default', schema='default', hiveconf_jinja_translate=False, script_begin_tag=None, run_as_owner=False, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, *args, **kwargs): super(HiveOperator, self).__init__(*args, **kwargs) self.hiveconf_jinja_translate = hiveconf_jinja_translate self.hql = hql self.schema = schema # 通过这个hive_cli_conn_id获取真实的hive配置信息 self.hive_cli_conn_id = hive_cli_conn_id self.script_begin_tag = script_begin_tag self.run_as = None if run_as_owner: self.run_as = self.dag.owner self.mapred_queue = mapred_queue self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name def get_hook(self): # 获取真正的hive beeline链接 return HiveCliHook( hive_cli_conn_id=self.hive_cli_conn_id, run_as=self.run_as, mapred_queue=self.mapred_queue, mapred_queue_priority=self.mapred_queue_priority, mapred_job_name=self.mapred_job_name) def execute(self, context): self.log.info('Executing: %s', self.hql) self.hook = self.get_hook() # 通过链接执行真正的命令 self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=context_to_airflow_vars(context)) class HiveCliHook(BaseHook): def __init__( self, hive_cli_conn_id="hive_cli_default", run_as=None, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None): # 根据传入信息 设置初始化参数 conn = self.get_connection(hive_cli_conn_id) self.hive_cli_params = conn.extra_dejson.get('hive_cli_params', '') self.use_beeline = conn.extra_dejson.get('use_beeline', False) self.auth = conn.extra_dejson.get('auth', 'noSasl') self.conn = conn self.run_as = run_as if mapred_queue_priority: mapred_queue_priority = mapred_queue_priority.upper() if mapred_queue_priority not in HIVE_QUEUE_PRIORITIES: raise AirflowException( "Invalid Mapred Queue Priority. Valid values are: " "{}".format(', '.join(HIVE_QUEUE_PRIORITIES))) self.mapred_queue = mapred_queue self.mapred_queue_priority = mapred_queue_priority self.mapred_job_name = mapred_job_name def _prepare_cli_cmd(self): """ This function creates the command list from available information 拼接生成beeline 链接的命令信息 """ conn = self.conn hive_bin = 'hive' cmd_extra = [] if self.use_beeline: hive_bin = 'beeline' jdbc_url = "jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}" if configuration.get('core', 'security') == 'kerberos': template = conn.extra_dejson.get( 'principal', "hive/[email protected]") if "_HOST" in template: template = utils.replace_hostname_pattern( utils.get_components(template)) proxy_user = "" # noqa if conn.extra_dejson.get('proxy_user') == "login" and conn.login: proxy_user = "hive.server2.proxy.user={0}".format(conn.login) elif conn.extra_dejson.get('proxy_user') == "owner" and self.run_as: proxy_user = "hive.server2.proxy.user={0}".format(self.run_as) jdbc_url += ";principal={template};{proxy_user}" elif self.auth: jdbc_url += ";auth=" + self.auth jdbc_url = jdbc_url.format(**locals()) cmd_extra += ['-u', jdbc_url] if conn.login: cmd_extra += ['-n', conn.login] if conn.password: cmd_extra += ['-p', conn.password] hive_params_list = self.hive_cli_params.split() return [hive_bin] + cmd_extra + hive_params_list def _prepare_hiveconf(self, d): """ This function prepares a list of hiveconf params from a dictionary of key value pairs. :param d: :type d: dict 参考样例配置hive conf >>> hh = HiveCliHook() >>> hive_conf = {"hive.exec.dynamic.partition": "true", ... "hive.exec.dynamic.partition.mode": "nonstrict"} >>> hh._prepare_hiveconf(hive_conf) ["-hiveconf", "hive.exec.dynamic.partition=true",\ "-hiveconf", "hive.exec.dynamic.partition.mode=nonstrict"] """ if not d: return [] return as_flattened_list( zip(["-hiveconf"] * len(d), ["{}={}".format(k, v) for k, v in d.items()]) ) def run_cli(self, hql, schema=None, verbose=True, hive_conf=None): conn = self.conn schema = schema or conn.schema if schema: hql = "USE {schema};\n{hql}".format(**locals()) with TemporaryDirectory(prefix='airflow_hiveop_') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir) as f: f.write(hql.encode('UTF-8')) f.flush() # 编译准备beeline 的链接信息 hive_cmd = self._prepare_cli_cmd() hive_conf_params = self._prepare_hiveconf(hive_conf) # 根据设置的参数和hive_conf ,进一步拼接设置beeline 查询命令 # 设置 -hiveconf 诸如 mapreduce.job.queuename,mapreduce.job.priority,mapred.job.name # 具体参考hive的beeline cli 查询语法 重点是 拼接出来的cmd 符合hive的查询语法 if self.mapred_queue: hive_conf_params.extend( ['-hiveconf', 'mapreduce.job.queuename={}' .format(self.mapred_queue)]) if self.mapred_queue_priority: hive_conf_params.extend( ['-hiveconf', 'mapreduce.job.priority={}' .format(self.mapred_queue_priority)]) if self.mapred_job_name: hive_conf_params.extend( ['-hiveconf', 'mapred.job.name={}' .format(self.mapred_job_name)]) hive_cmd.extend(hive_conf_params) hive_cmd.extend(['-f', f.name]) if verbose: self.log.info(" ".join(hive_cmd)) # 启动子进程执行命令 sp = subprocess.Popen( hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir) self.sp = sp stdout = '' while True: # 设置 标准输出stdout 到airflow 指定的文件内 line = sp.stdout.readline() if not line: break stdout += line.decode('UTF-8') if verbose: self.log.info(line.decode('UTF-8').strip()) sp.wait() if sp.returncode: raise AirflowException(stdout) return stdout def test_hql(self, hql): """ Test an hql statement using the hive cli and EXPLAIN 测试执行一个hql的EXPLAIN在hive cli内 """


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3